---
title: 传输
description: 了解 MCP 的通信机制
---

模型上下文协议（MCP）中的传输为客户端与服务器之间的通信提供了基础。传输层负责处理消息发送和接收的底层机制。

## 消息格式

MCP 使用 [JSON-RPC](https://www.jsonrpc.org/) 2.0 作为其传输格式。传输层负责将 MCP 协议消息转换为 JSON-RPC 格式进行传输，并将接收到的 JSON-RPC 消息转换回 MCP 协议消息。

使用的 JSON-RPC 消息有三种类型：

### 请求
```typescript
{
  jsonrpc: "2.0",
  id: number | string,
  method: string,
  params?: object
}
```

### 响应
```typescript
{
  jsonrpc: "2.0",
  id: number | string,
  result?: object,
  error?: {
    code: number,
    message: string,
    data?: unknown
  }
}
```

### 通知
```typescript
{
  jsonrpc: "2.0",
  method: string,
  params?: object
}
```

## 内置传输类型

MCP 包括两种标准传输实现：

### 标准输入/输出（stdio）

stdio 传输通过标准输入和输出流实现通信。这对于本地集成和命令行工具尤为有用。

在以下情况下使用 stdio：
- 构建命令行工具
- 实现本地集成
- 需要简单的进程通信
- 处理 shell 脚本

<Tabs>
  <Tab title="TypeScript (服务器)">
    ```typescript
    const server = new Server({
      name: "example-server",
      version: "1.0.0"
    }, {
      capabilities: {}
    });

    const transport = new StdioServerTransport();
    await server.connect(transport);
    ```
  </Tab>
  <Tab title="TypeScript (客户端)">
    ```typescript
    const client = new Client({
      name: "example-client",
      version: "1.0.0"
    }, {
      capabilities: {}
    });

    const transport = new StdioClientTransport({
      command: "./server",
      args: ["--option", "value"]
    });
    await client.connect(transport);
    ```
  </Tab>
  <Tab title="Python (服务器)">
    ```python
    app = Server("example-server")

    async with stdio_server() as streams:
        await app.run(
            streams[0],
            streams[1],
            app.create_initialization_options()
        )
    ```
  </Tab>
  <Tab title="Python (客户端)">
    ```python
    params = StdioServerParameters(
        command="./server",
        args=["--option", "value"]
    )

    async with stdio_client(params) as streams:
        async with ClientSession(streams[0], streams[1]) as session:
            await session.initialize()
    ```
  </Tab>
</Tabs>

### 服务器发送事件（SSE）

SSE 传输支持服务器到客户端的流式传输，并使用 HTTP POST 请求进行客户端到服务器的通信。

在以下情况下使用 SSE：
- 只需要服务器到客户端的流式传输
- 在受限网络中工作
- 实现简单更新

#### 安全警告：DNS 重绑定攻击

如果未妥善保护，SSE 传输可能容易受到 DNS 重绑定攻击。为防止此类攻击：

1. **始终验证传入 SSE 连接的 Origin 头部**，确保它们来自预期来源
2. **避免将服务器绑定到所有网络接口**（0.0.0.0）在本地运行时，仅绑定到 localhost（127.0.0.1）
3. **为所有 SSE 连接实施适当的认证**

若不采取这些保护措施，攻击者可能通过 DNS 重绑定从远程网站与本地 MCP 服务器交互。

<Tabs>
  <Tab title="TypeScript (服务器)">
    ```typescript
    import express from "express";
    
    const app = express();
    
    const server = new Server({
      name: "example-server",
      version: "1.0.0"
    }, {
      capabilities: {}
    });
    
    let transport: SSEServerTransport | null = null;

    app.get("/sse", (req, res) => {
      transport = new SSEServerTransport("/messages", res);
      server.connect(transport);
    });

    app.post("/messages", (req, res) => {
      if (transport) {
        transport.handlePostMessage(req, res);
      }
    });

    app.listen(3000);
    ```
  </Tab>
  <Tab title="TypeScript (客户端)">
    ```typescript
    const client = new Client({
      name: "example-client",
      version: "1.0.0"
    }, {
      capabilities: {}
    });

    const transport = new SSEClientTransport(
      new URL("http://localhost:3000/sse")
    );
    await client.connect(transport);
    ```
  </Tab>
  <Tab title="Python (服务器)">
    ```python
    from mcp.server.sse import SseServerTransport
    from starlette.applications import Starlette
    from starlette.routing import Route

    app = Server("example-server")
    sse = SseServerTransport("/messages")

    async def handle_sse(scope, receive, send):
        async with sse.connect_sse(scope, receive, send) as streams:
            await app.run(streams[0], streams[1], app.create_initialization_options())

    async def handle_messages(scope, receive, send):
        await sse.handle_post_message(scope, receive, send)

    starlette_app = Starlette(
        routes=[
            Route("/sse", endpoint=handle_sse),
            Route("/messages", endpoint=handle_messages, methods=["POST"]),
        ]
    )
    ```
  </Tab>
  <Tab title="Python (客户端)">
    ```python
    async with sse_client("http://localhost:8000/sse") as streams:
        async with ClientSession(streams[0], streams[1]) as session:
            await session.initialize()
    ```
  </Tab>
</Tabs>

## 自定义传输

MCP 便于为特定需求实现自定义传输。任何传输实现只需符合传输接口即可：

您可以为以下目的实现自定义传输：
- 自定义网络协议
- 专用通信通道
- 与现有系统集成
- 性能优化

<Tabs>
  <Tab title="TypeScript">
    ```typescript
    interface Transport {
      // 开始处理消息
      start(): Promise<void>;

      // 发送 JSON-RPC 消息
      send(message: JSONRPCMessage): Promise<void>;

      // 关闭连接
      close(): Promise<void>;

      // 回调
      onclose?: () => void;
      onerror?: (error: Error) => void;
      onmessage?: (message: JSONRPCMessage) => void;
    }
    ```
  </Tab>
  <Tab title="Python">
    请注意，尽管 MCP 服务器通常使用 asyncio 实现，但我们建议使用 `anyio` 实现传输等低级接口，以获得更广泛的兼容性。
    ```python
    @contextmanager
    async def create_transport(
        read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception],
        write_stream: MemoryObjectSendStream[JSONRPCMessage]
    ):
        """
        MCP 的传输接口。

        参数：
            read_stream: 用于读取传入消息的流
            write_stream: 用于写入传出消息的流
        """
        async with anyio.create_task_group() as tg:
            try:
                # 开始处理消息
                tg.start_soon(lambda: process_messages(read_stream))

                # 发送消息
                async with write_stream:
                    yield write_stream

            except Exception as exc:
                # 处理错误
                raise exc
            finally:
                # 清理
                tg.cancel_scope.cancel()
                await write_stream.aclose()
                await read_stream.aclose()
    ```
  </Tab>
</Tabs>

## 错误处理

传输实现应处理各种错误场景：

1. 连接错误
2. 消息解析错误
3. 协议错误
4. 网络超时
5. 资源清理

错误处理示例：

<Tabs>
  <Tab title="TypeScript">
    ```typescript
    class ExampleTransport implements Transport {
      async start() {
        try {
          // 连接逻辑
        } catch (error) {
          this.onerror?.(new Error(`连接失败：${error}`));
          throw error;
        }
      }

      async send(message: JSONRPCMessage) {
        try {
          // 发送逻辑
        } catch (error) {
          this.onerror?.(new Error(`发送消息失败：${error}`));
          throw error;
        }
      }
    }
    ```
  </Tab>
  <Tab title="Python">
    请注意，尽管 MCP 服务器通常使用 asyncio 实现，但我们建议使用 `anyio` 实现传输等低级接口，以获得更广泛的兼容性。
    ```python
    @contextmanager
    async def example_transport(scope: Scope, receive: Receive, send: Send):
        try:
            # 创建用于双向通信的流
            read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
            write_stream, write_stream_reader = anyio.create_memory_object_stream(0)

            async def message_handler():
                try:
                    async with read_stream_writer:
                        # 消息处理逻辑
                        pass
                except Exception as exc:
                    logger.error(f"处理消息失败：{exc}")
                    raise exc

            async with anyio.create_task_group() as tg:
                tg.start_soon(message_handler)
                try:
                    # 为通信提供流
                    yield read_stream, write_stream
                except Exception as exc:
                    logger.error(f"传输错误：{exc}")
                    raise exc
                finally:
                    tg.cancel_scope.cancel()
                    await write_stream.aclose()
                    await read_stream.aclose()
        except Exception as exc:
            logger.error(f"初始化传输失败：{exc}")
            raise exc
    ```
  </Tab>
</Tabs>

## 最佳实践

实现或使用 MCP 传输时：

1. 正确处理连接生命周期
2. 实现适当的错误处理
3. 在连接关闭时清理资源
4. 使用适当的超时
5. 在发送前验证消息
6. 记录传输事件以便调试
7. 在适当时候实现重连逻辑
8. 处理消息队列中的背压
9. 监控连接健康状态
10. 实施适当的安全措施

## 安全考虑

实现传输时：

### 认证和授权
- 实施适当的认证机制
- 验证客户端凭据
- 使用安全的令牌处理
- 实施授权检查

### 数据安全
- 对网络传输使用 TLS
- 加密敏感数据
- 验证消息完整性
- 实施消息大小限制
- 清理输入数据

### 网络安全
- 实施速率限制
- 使用适当的超时
- 处理拒绝服务场景
- 监控异常模式
- 实施适当的防火墙规则
- 对于 SSE 传输，验证 Origin 头部以防止 DNS 重绑定攻击
- 对于本地 SSE 服务器，仅绑定到 localhost（127.0.0.1），而不是所有接口（0.0.0.0）

## 调试传输

调试传输问题的技巧：

1. 启用调试日志
2. 监控消息流
3. 检查连接状态
4. 验证消息格式
5. 测试错误场景
6. 使用网络分析工具
7. 实施健康检查
8. 监控资源使用情况
9. 测试边缘情况
10. 使用适当的错误跟踪
```

---